跳到主要内容

Redis 大 key 用二级缓存优化

什么是 Redis 大 key 问题

Redis 大 key 是指存储在 Redis 中的单个 key 对应的 value 占用内存过大或包含元素过多的情况。这类问题在高并发场景下会严重影响 Redis 性能和稳定性。

典型大 key 场景

// 1. 用户画像数据 - Hash 类型
userProfile := map[string]string{
"user_id": "12345",
"name": "张三",
"age": "25",
"city": "北京",
// ... 数千个属性字段
"preference_1000": "value_1000",
}

// 2. 商品详情缓存 - String 类型
productDetail := `{
"id": 12345,
"images": ["url1", "url2", ...], // 包含大量图片URL
"description": "超长商品描述...", // 几MB的富文本
"reviews": [...] // 数千条评论数据
}`

// 3. 实时排行榜 - ZSet 类型
leaderboard := []struct{
Score float64
Member string
}{
{9999.0, "player1"},
{9998.0, "player2"},
// ... 百万级别的排行数据
}

大 key 带来的问题

性能问题

主要性能影响

  1. 操作阻塞:大 key 的读写操作会阻塞 Redis 主线程
  2. 网络拥塞:大量数据传输占用网络带宽
  3. 内存碎片:频繁的大对象分配释放导致内存碎片
  4. 慢查询:大 key 操作容易触发慢查询告警

可用性问题

// 大 key 删除示例 - 可能导致阻塞
func deleteBigKey(client *redis.Client, key string) error {
// 危险操作:直接删除大 key 可能阻塞数秒
return client.Del(context.Background(), key).Err()
}

// 安全的删除方式
func safeDeletBigKey(client *redis.Client, key string) error {
keyType := client.Type(context.Background(), key).Val()

switch keyType {
case "hash":
return deleteHashGradually(client, key)
case "list":
return deleteListGradually(client, key)
case "set":
return deleteSetGradually(client, key)
default:
return client.Del(context.Background(), key).Err()
}
}

二级缓存优化原理

二级缓存是通过在应用程序内存中建立本地缓存层,减少对 Redis 大 key 的直接访问频率,从而缓解大 key 问题的优化策略。

核心优化机制

数据分层存储

  • L1 缓存:应用进程内存,访问速度最快(纳秒级)
  • L2 缓存:Redis 远程缓存,访问速度中等(毫秒级)
  • 数据源:数据库等持久化存储,访问速度最慢(10-100毫秒)

缓存命中率优化

type CacheMetrics struct {
L1Hits int64 // 本地缓存命中次数
L2Hits int64 // Redis 缓存命中次数
TotalReqs int64 // 总请求次数

L1HitRate float64 // L1 命中率
L2HitRate float64 // L2 命中率
}

func (m *CacheMetrics) UpdateHitRate() {
m.L1HitRate = float64(m.L1Hits) / float64(m.TotalReqs)
m.L2HitRate = float64(m.L2Hits) / float64(m.TotalReqs)
}

分布式二级缓存同步机制

在分布式环境中,多个应用实例都有本地缓存,当数据更新时需要保证所有实例的缓存一致性。

基于消息队列的同步方案

Redis Pub/Sub 实现

type DistributedCacheManager struct {
localCache LocalCache
redisClient *redis.Client
pubsub *redis.PubSub
nodeID string
updateChan chan CacheUpdateEvent
}

type CacheUpdateEvent struct {
Type string `json:"type"` // "delete", "update", "clear"
Keys []string `json:"keys"` // 受影响的缓存键
Value interface{} `json:"value"` // 更新的值(仅update类型)
NodeID string `json:"node_id"` // 发起更新的节点ID
Timestamp int64 `json:"timestamp"` // 时间戳
Version int64 `json:"version"` // 版本号
}

func NewDistributedCacheManager(redisClient *redis.Client, nodeID string) *DistributedCacheManager {
dcm := &DistributedCacheManager{
localCache: NewLRUCache(1000),
redisClient: redisClient,
nodeID: nodeID,
updateChan: make(chan CacheUpdateEvent, 1000),
}

// 订阅缓存更新通道
dcm.pubsub = redisClient.Subscribe(context.Background(), "cache:invalidate")

// 启动同步协程
go dcm.startSyncWorker()
go dcm.startUpdateListener()

return dcm
}

func (dcm *DistributedCacheManager) startUpdateListener() {
ch := dcm.pubsub.Channel()

for msg := range ch {
var event CacheUpdateEvent
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
continue
}

// 忽略自己发送的消息
if event.NodeID == dcm.nodeID {
continue
}

// 处理缓存更新事件
dcm.handleCacheEvent(event)
}
}

func (dcm *DistributedCacheManager) handleCacheEvent(event CacheUpdateEvent) {
switch event.Type {
case "delete":
for _, key := range event.Keys {
dcm.localCache.Delete(key)
}

case "update":
for _, key := range event.Keys {
if event.Value != nil {
dcm.localCache.Set(key, event.Value, 10*time.Minute)
}
}

case "clear":
dcm.localCache.Clear()

case "refresh":
// 强制刷新指定key
for _, key := range event.Keys {
go dcm.refreshFromRedis(key)
}
}
}

func (dcm *DistributedCacheManager) broadcastCacheEvent(event CacheUpdateEvent) {
event.NodeID = dcm.nodeID
event.Timestamp = time.Now().Unix()

data, err := json.Marshal(event)
if err != nil {
return
}

// 发布到Redis频道
dcm.redisClient.Publish(context.Background(), "cache:invalidate", string(data))
}

基于 Kafka 的可靠同步

对于对一致性要求更高的场景,可以使用 Kafka 替代 Redis Pub/Sub:

type KafkaCacheSync struct {
producer sarama.SyncProducer
consumer sarama.Consumer
localCache LocalCache
nodeID string
topic string
}

func (kcs *KafkaCacheSync) PublishCacheEvent(event CacheUpdateEvent) error {
event.NodeID = kcs.nodeID
event.Timestamp = time.Now().Unix()

data, err := json.Marshal(event)
if err != nil {
return err
}

message := &sarama.ProducerMessage{
Topic: kcs.topic,
Key: sarama.StringEncoder(event.Keys[0]), // 使用第一个key作为分区键
Value: sarama.ByteEncoder(data),
Headers: []sarama.RecordHeader{
{
Key: []byte("node_id"),
Value: []byte(kcs.nodeID),
},
{
Key: []byte("event_type"),
Value: []byte(event.Type),
},
},
}

_, _, err = kcs.producer.SendMessage(message)
return err
}

func (kcs *KafkaCacheSync) StartConsumer(ctx context.Context) error {
partitionConsumer, err := kcs.consumer.ConsumePartition(kcs.topic, 0, sarama.OffsetNewest)
if err != nil {
return err
}

go func() {
defer partitionConsumer.Close()

for {
select {
case msg := <-partitionConsumer.Messages():
kcs.handleKafkaMessage(msg)
case <-ctx.Done():
return
}
}
}()

return nil
}

func (kcs *KafkaCacheSync) handleKafkaMessage(msg *sarama.ConsumerMessage) {
// 检查消息来源,避免处理自己发送的消息
for _, header := range msg.Headers {
if string(header.Key) == "node_id" && string(header.Value) == kcs.nodeID {
return
}
}

var event CacheUpdateEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return
}

// 处理缓存更新事件
kcs.handleCacheEvent(event)
}

最终一致性保证

定期全量同步

type CacheConsistencyChecker struct {
dcm *DistributedCacheManager
checkInterval time.Duration
batchSize int
}

func (ccc *CacheConsistencyChecker) StartPeriodicCheck() {
ticker := time.NewTicker(ccc.checkInterval)
defer ticker.Stop()

for range ticker.C {
ccc.performConsistencyCheck()
}
}

func (ccc *CacheConsistencyChecker) performConsistencyCheck() {
// 1. 获取本地缓存的所有key
localKeys := ccc.dcm.localCache.Keys()

// 2. 分批检查版本一致性
for i := 0; i < len(localKeys); i += ccc.batchSize {
end := i + ccc.batchSize
if end > len(localKeys) {
end = len(localKeys)
}

batch := localKeys[i:end]
ccc.checkBatchConsistency(batch)
}
}

func (ccc *CacheConsistencyChecker) checkBatchConsistency(keys []string) {
// 使用pipeline获取Redis中的版本信息
pipe := ccc.dcm.redisClient.Pipeline()

for _, key := range keys {
pipe.HGet(context.Background(), key+":meta", "version")
}

results, err := pipe.Exec(context.Background())
if err != nil {
return
}

for i, key := range keys {
redisVersion, err := results[i].(*redis.StringCmd).Int64()
if err != nil {
continue
}

// 获取本地缓存版本
if localItem, ok := ccc.dcm.localCache.Get(key); ok {
if versionedItem, ok := localItem.(VersionedCacheItem); ok {
if versionedItem.Version < redisVersion {
// 本地缓存版本过旧,刷新
go ccc.dcm.refreshFromRedis(key)
}
}
}
}
}

本地缓存设计与实现

缓存数据结构选择

针对不同的大 key 类型,本地缓存需要采用不同的数据结构:

// 本地缓存接口定义
type LocalCache interface {
Get(key string) (interface{}, bool)
Set(key string, value interface{}, ttl time.Duration)
Delete(key string)
Size() int
Clear()
}

// 基于 sync.Map 的线程安全实现
type SyncMapCache struct {
data sync.Map
ttlMap sync.Map // 存储过期时间
maxSize int
}

func (c *SyncMapCache) Get(key string) (interface{}, bool) {
// 检查是否过期
if expireTime, ok := c.ttlMap.Load(key); ok {
if time.Now().After(expireTime.(time.Time)) {
c.Delete(key)
return nil, false
}
}

return c.data.Load(key)
}

func (c *SyncMapCache) Set(key string, value interface{}, ttl time.Duration) {
c.data.Store(key, value)
if ttl > 0 {
c.ttlMap.Store(key, time.Now().Add(ttl))
}
}

内存管理策略

LRU 淘汰机制

type LRUCache struct {
capacity int
cache map[string]*Node
head *Node
tail *Node
mutex sync.RWMutex
}

type Node struct {
key string
value interface{}
prev *Node
next *Node
size int64 // 对象占用内存大小
}

func (c *LRUCache) moveToHead(node *Node) {
c.removeNode(node)
c.addToHead(node)
}

func (c *LRUCache) evictLRU() {
// 淘汰最少使用的节点
last := c.tail.prev
c.removeNode(last)
delete(c.cache, last.key)
}

内存限制控制

type MemoryAwareCache struct {
maxMemory int64 // 最大内存限制
currentMemory int64 // 当前使用内存
cache map[string]*CacheItem
mutex sync.RWMutex
}

type CacheItem struct {
Value interface{}
Size int64
ExpireAt time.Time
AccessTime time.Time
}

func (c *MemoryAwareCache) Set(key string, value interface{}, ttl time.Duration) error {
size := calculateObjectSize(value)

c.mutex.Lock()
defer c.mutex.Unlock()

// 检查内存限制
if c.currentMemory + size > c.maxMemory {
if err := c.evictToFit(size); err != nil {
return err
}
}

c.cache[key] = &CacheItem{
Value: value,
Size: size,
ExpireAt: time.Now().Add(ttl),
AccessTime: time.Now(),
}
c.currentMemory += size

return nil
}

缓存一致性保证

数据更新策略

Cache-Aside 模式实现

type UserService struct {
cache LocalCache
redisClient *redis.Client
db *sql.DB
mq MessageQueue
}

func (s *UserService) GetUserProfile(userID string) (*UserProfile, error) {
cacheKey := fmt.Sprintf("user:profile:%s", userID)

// 1. 尝试从本地缓存获取
if value, ok := s.cache.Get(cacheKey); ok {
return value.(*UserProfile), nil
}

// 2. 尝试从 Redis 获取
data, err := s.redisClient.Get(context.Background(), cacheKey).Result()
if err == nil {
var profile UserProfile
if err := json.Unmarshal([]byte(data), &profile); err == nil {
// 更新本地缓存
s.cache.Set(cacheKey, &profile, 10*time.Minute)
return &profile, nil
}
}

// 3. 从数据库查询
profile, err := s.queryUserFromDB(userID)
if err != nil {
return nil, err
}

// 4. 更新缓存
go s.updateCaches(cacheKey, profile)

return profile, nil
}

func (s *UserService) UpdateUserProfile(userID string, profile *UserProfile) error {
cacheKey := fmt.Sprintf("user:profile:%s", userID)

// 1. 更新数据库
if err := s.updateUserInDB(userID, profile); err != nil {
return err
}

// 2. 删除缓存(延迟双删策略)
s.cache.Delete(cacheKey)
s.redisClient.Del(context.Background(), cacheKey)

// 3. 发送更新消息给其他实例
go s.mq.Publish("user.profile.updated", map[string]interface{}{
"user_id": userID,
"action": "delete_cache",
})

// 4. 延迟删除(处理并发更新问题)
time.AfterFunc(500*time.Millisecond, func() {
s.cache.Delete(cacheKey)
s.redisClient.Del(context.Background(), cacheKey)
})

return nil
}

分布式一致性

消息广播机制

type CacheInvalidator struct {
localCache LocalCache
subscriber MessageSubscriber
}

func (c *CacheInvalidator) StartListening() {
c.subscriber.Subscribe("cache.invalidate", func(message Message) {
var event struct {
Keys []string `json:"keys"`
Action string `json:"action"`
}

if err := json.Unmarshal(message.Data, &event); err != nil {
return
}

switch event.Action {
case "delete":
for _, key := range event.Keys {
c.localCache.Delete(key)
}
case "clear":
c.localCache.Clear()
}
})
}

分片策略处理大 Hash

对于超大的 Hash 类型大 key,可以采用分片策略将其拆分为多个小的 key:

分片实现

type ShardedHashCache struct {
shardCount int
localCaches []LocalCache
redisClient *redis.Client
}

func (s *ShardedHashCache) hashUserID(userID string) int {
hash := fnv.New32a()
hash.Write([]byte(userID))
return int(hash.Sum32()) % s.shardCount
}

func (s *ShardedHashCache) GetUserField(userID, field string) (string, error) {
shardIndex := s.hashUserID(userID)
cacheKey := fmt.Sprintf("user:profile:%s:%d", userID, shardIndex)

// 1. 尝试从对应分片的本地缓存获取
if value, ok := s.localCaches[shardIndex].Get(cacheKey); ok {
if profileMap, ok := value.(map[string]string); ok {
if fieldValue, exists := profileMap[field]; exists {
return fieldValue, nil
}
}
}

// 2. 从 Redis 分片获取
redisKey := fmt.Sprintf("user:profile:%s:%d", userID, shardIndex)
value, err := s.redisClient.HGet(context.Background(), redisKey, field).Result()
if err == redis.Nil {
return "", nil
}
if err != nil {
return "", err
}

// 3. 更新本地缓存(懒加载整个分片)
go s.loadShardToLocal(userID, shardIndex)

return value, nil
}

func (s *ShardedHashCache) loadShardToLocal(userID string, shardIndex int) {
redisKey := fmt.Sprintf("user:profile:%s:%d", userID, shardIndex)

// 获取整个分片数据
profileMap, err := s.redisClient.HGetAll(context.Background(), redisKey).Result()
if err != nil {
return
}

// 更新到对应的本地缓存分片
cacheKey := fmt.Sprintf("user:profile:%s:%d", userID, shardIndex)
s.localCaches[shardIndex].Set(cacheKey, profileMap, 15*time.Minute)
}

性能监控与调优

缓存性能指标

type CacheMetrics struct {
// 命中率指标
L1Hits int64 `json:"l1_hits"`
L2Hits int64 `json:"l2_hits"`
Misses int64 `json:"misses"`
TotalRequests int64 `json:"total_requests"`

// 性能指标
AvgL1Latency time.Duration `json:"avg_l1_latency"`
AvgL2Latency time.Duration `json:"avg_l2_latency"`
AvgDBLatency time.Duration `json:"avg_db_latency"`

// 内存指标
L1MemoryUsage int64 `json:"l1_memory_usage"`
L2MemoryUsage int64 `json:"l2_memory_usage"`

// 错误指标
L1Errors int64 `json:"l1_errors"`
L2Errors int64 `json:"l2_errors"`
DBErrors int64 `json:"db_errors"`
}

func (m *CacheMetrics) CalculateHitRates() (float64, float64, float64) {
if m.TotalRequests == 0 {
return 0, 0, 0
}

l1Rate := float64(m.L1Hits) / float64(m.TotalRequests)
l2Rate := float64(m.L2Hits) / float64(m.TotalRequests)
missRate := float64(m.Misses) / float64(m.TotalRequests)

return l1Rate, l2Rate, missRate
}

自动调优策略

type AutoTuner struct {
cache *TieredCache
metrics *CacheMetrics
config *TunerConfig
lastAdjustment time.Time
}

type TunerConfig struct {
MinL1HitRate float64 `json:"min_l1_hit_rate"`
MaxMemoryUsage int64 `json:"max_memory_usage"`
AdjustmentInterval time.Duration `json:"adjustment_interval"`
TTLAdjustmentFactor float64 `json:"ttl_adjustment_factor"`
}

func (t *AutoTuner) Run() {
ticker := time.NewTicker(t.config.AdjustmentInterval)
defer ticker.Stop()

for range ticker.C {
if time.Since(t.lastAdjustment) < t.config.AdjustmentInterval {
continue
}

l1Rate, l2Rate, _ := t.metrics.CalculateHitRates()

// 根据命中率调整策略
if l1Rate < t.config.MinL1HitRate {
t.increaseCacheSize()
}

// 根据内存使用率调整策略
if t.metrics.L1MemoryUsage > t.config.MaxMemoryUsage {
t.adjustTTL()
}

t.lastAdjustment = time.Now()
}
}

func (t *AutoTuner) increaseCacheSize() {
// 动态增加本地缓存容量
currentSize := t.cache.GetMaxSize()
newSize := int(float64(currentSize) * 1.2) // 增加20%
t.cache.SetMaxSize(newSize)
}

func (t *AutoTuner) adjustTTL() {
// 缩短 TTL 以减少内存占用
currentTTL := t.cache.GetDefaultTTL()
newTTL := time.Duration(float64(currentTTL) * t.config.TTLAdjustmentFactor)
t.cache.SetDefaultTTL(newTTL)
}

实际应用场景

电商商品详情缓存

type ProductDetailCache struct {
localCache LocalCache
redisClient *redis.Client
db *sql.DB
}

func (p *ProductDetailCache) GetProductDetail(productID string) (*ProductDetail, error) {
// 商品详情通常包含大量图片、描述、评论等数据
cacheKey := fmt.Sprintf("product:detail:%s", productID)

// 本地缓存优先,减少 Redis 网络开销
if detail, ok := p.localCache.Get(cacheKey); ok {
return detail.(*ProductDetail), nil
}

// Redis 二级缓存
if data, err := p.redisClient.Get(context.Background(), cacheKey).Result(); err == nil {
var detail ProductDetail
if json.Unmarshal([]byte(data), &detail) == nil {
// 异步更新本地缓存,避免阻塞
go p.localCache.Set(cacheKey, &detail, 5*time.Minute)
return &detail, nil
}
}

// 数据库兜底
detail, err := p.queryProductFromDB(productID)
if err != nil {
return nil, err
}

// 异步更新多级缓存
go p.updateAllCaches(cacheKey, detail)

return detail, nil
}

用户画像数据缓存

type UserProfileCache struct {
shardedCache *ShardedHashCache
bloomFilter *BloomFilter
}

func (u *UserProfileCache) GetUserProfile(userID string) (*UserProfile, error) {
// 使用布隆过滤器预判用户是否存在
if !u.bloomFilter.Test([]byte(userID)) {
return nil, errors.New("user not found")
}

// 分片获取用户画像数据
profile := &UserProfile{}

// 并发获取各个分片数据
var wg sync.WaitGroup
errChan := make(chan error, 3)

// 基础信息分片
wg.Add(1)
go func() {
defer wg.Done()
basicInfo, err := u.shardedCache.GetUserField(userID, "basic_info")
if err != nil {
errChan <- err
return
}
json.Unmarshal([]byte(basicInfo), &profile.BasicInfo)
}()

// 偏好设置分片
wg.Add(1)
go func() {
defer wg.Done()
preferences, err := u.shardedCache.GetUserField(userID, "preferences")
if err != nil {
errChan <- err
return
}
json.Unmarshal([]byte(preferences), &profile.Preferences)
}()

// 行为数据分片
wg.Add(1)
go func() {
defer wg.Done()
behavior, err := u.shardedCache.GetUserField(userID, "behavior")
if err != nil {
errChan <- err
return
}
json.Unmarshal([]byte(behavior), &profile.Behavior)
}()

wg.Wait()
close(errChan)

// 检查是否有错误
if len(errChan) > 0 {
return nil, <-errChan
}

return profile, nil
}

通过合理设计二级缓存架构,可以显著缓解 Redis 大 key 问题,提升系统整体性能。关键在于根据业务特点选择合适的缓存策略、分片方案和一致性保证机制,同时建立完善的监控和自动调优体系。